package com.niit.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;

public class ConsumerByHandSync {

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"BD2_2");
        //手动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);

        //订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("BD2_2");
        consumer.subscribe(topics);

        //消费数据
        while (true){
            ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String,String> data:consumerRecords) {
                System.out.println(data);
            }
            //手动同步提交 --> 把偏移量提交到broker中得应答之后再去拉取数据
            //consumer.commitSync();
            //手动异步提交 --> 把偏移量提交到broker中 无论是否得到应答都会继续拉取数据
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if(e == null){
                        System.out.println("提交偏移量成功"+map);
                    }else{
                        System.out.println(e.toString());
                    }
                }
            });

        }

    }
}
